package com.uxsino.reactorq.event;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.uxsino.reactorq.processor.QueueProcessor;

public class EventDispatcher {
    private static Logger logger = LoggerFactory.getLogger(EventDispatcher.class);

    private class ProcessEntry<T extends Event> {
        T event;

        Consumer<T> action;

        public ProcessEntry(T event, Consumer<T> action) {
            this.event = event;
            this.action = action;
        }
    }

    private class SubscribEntry<T extends Event> {
        Class<T> cls;

        Consumer<T> action;

        public SubscribEntry(Class<T> eventClass, Consumer<T> subscriber) {
            cls = eventClass;
            action = subscriber;
        }

    }

    private Map<String, List<SubscribEntry<?>>> subscribers;

//    private TopicProcessor<ProcessEntry<?>> processor;
    private QueueProcessor<ProcessEntry<?>> processor;

    private ObjectMapper mapper;

    public EventDispatcher() {
        subscribers = new ConcurrentHashMap<>();
//        processor = ProcessUtil.<ProcessEntry<?>> createTopicProcessor("simo_event_processor");
        processor = QueueProcessor.of("simo_event_processor", 512);
        processor.subscribe(this::process);
        mapper = new ObjectMapper();
    }

    public <T extends Event> void subscribeEvent(Class<T> eventClass, Subscriber<T> subscriber) {
        subscribeEvent(eventClass, ev -> subscriber.onNext(ev));
    }

    public synchronized <T extends Event> void subscribeEvent(Class<T> eventClass, Consumer<T> subscriber) {
        String eventName = Event.getEventName(eventClass);
        if (subscribers.containsKey(eventName)) {
            subscribers.get(eventName).add(new SubscribEntry<>(eventClass, subscriber));
        } else {
            ArrayList<SubscribEntry<?>> list = new ArrayList<>();
            list.add(new SubscribEntry<>(eventClass, subscriber));
            subscribers.put(eventName, list);
        }
    }

    @SuppressWarnings("unchecked")
    public <T extends Event> void dispatch(String msg) {
        JsonNode node;
        try {
            node = mapper.readTree(msg);
        } catch (IOException e1) {
            logger.warn("not an event message: {}", msg);
            return;
        }
        JsonNode nEvent = node.get("event");

        if (nEvent == null) {
            logger.warn("not an event message: {}", msg);
            return;
        }
        String eventName = nEvent.asText();

        List<SubscribEntry<?>> list = subscribers.get(eventName);

        if (list == null) {
            return; // no body is listening
        }
        try {
            for (SubscribEntry<?> entry : list) {
                T evt = (T) mapper.treeToValue(node, entry.cls);
                processor.next(new ProcessEntry<T>(evt, ((Consumer<T>) entry.action)));
            }
        } catch (JsonProcessingException e) {
            logger.error("error parsing event data. {}", msg.length() > 200 ? msg.substring(0, 200) + "..." : msg);
        }
    }

    private <T extends Event> void process(ProcessEntry<T> entry) {
        entry.action.accept(entry.event);
    }
}
